Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat (Core): PostgresJobQueue Implementation and Job Processing Enhancements #30175

Conversation

jgambarios
Copy link
Contributor

@jgambarios jgambarios commented Sep 27, 2024

This PR introduces the PostgresJobQueue implementation, fulfilling the requirement for a default job queue using PostgreSQL.
In the process of developing this implementation, we've made significant improvements to the overall job processing system, including enhancements to the JobQueueManagerAPIImpl and related components.

Key Features of PostgresJobQueue:

  1. Efficient Job Management: Implements all required operations for job queue management, including creating, retrieving, updating, and monitoring jobs.
  2. JSONB Field Usage: Utilizes PostgreSQL's JSONB fields for storing job parameters and results, allowing for flexible and efficient data storage.
  3. Optimized Queries: Implements efficient SQL queries, including the use of SELECT FOR UPDATE SKIP LOCKED for concurrent job processing.
  4. Pagination Support: Provides paginated results for job listings, improving performance for large job sets.
  5. Comprehensive Job Lifecycle Management: Handles all stages of a job's lifecycle, from creation to completion or failure.
  6. Transactional Integrity: Ensures data consistency through proper transaction management.

Improvements to Job Processing System:

  1. Enhanced Error Handling: Implemented a more robust error handling mechanism, including the use of custom exceptions for different error scenarios.
  2. Improved Retry Logic: Refined the retry mechanism to handle transient failures more gracefully.
  3. Real-Time Job Monitoring: Enhanced the real-time monitoring capabilities, allowing for more responsive job status updates.
  4. Progress Tracking: Improved the progress tracking system, providing more accurate and timely updates on job completion percentage.
  5. Cancellation Support: Implemented a more reliable job cancellation process.

JobQueueManagerAPIImpl Enhancements:

  1. Event-Driven Updates: Implemented an event-driven system for job status updates, improving system responsiveness.
  2. More Granular Job State Transitions: Implemented more detailed job state transitions, providing better insights into job processing stages.

Testing and Documentation:

Developed an extensive set of unit and integration tests for the PostgresJobQueue implementation.

Simple class diagram with main operations and interactions

classDiagram
    class JobQueueManagerAPI {
        <<interface>>
        +start()
        +close()
        +createJob(queueName: String, parameters: Map): String
        +getJob(jobId: String): Job
        +getJobs(page: int, pageSize: int): JobPaginatedResult
        +cancelJob(jobId: String)
        +watchJob(jobId: String, watcher: Consumer~Job~)
        +setRetryStrategy(queueName: String, retryStrategy: RetryStrategy)
    }

    class JobQueueManagerAPIImpl {
        -jobQueue: JobQueue
        -processors: Map~String, JobProcessor~
        -retryStrategies: Map~String, RetryStrategy~
        -circuitBreaker: CircuitBreaker
        +start()
        +close()
        -processJobs()
        -processJobWithRetry(job: Job)
    }

    class JobQueue {
        <<interface>>
        +createJob(queueName: String, parameters: Map): String
        +getJob(jobId: String): Job
        +getActiveJobs(queueName: String, page: int, pageSize: int): JobPaginatedResult
        +getCompletedJobs(queueName: String, startDate: LocalDateTime, endDate: LocalDateTime, page: int, pageSize: int): JobPaginatedResult
        +updateJobStatus(job: Job)
        +nextJob(): Job
        +updateJobProgress(jobId: String, progress: float)
    }

    class PostgresJobQueue {
        -objectMapper: ObjectMapper
        +createJob(queueName: String, parameters: Map): String
        +getJob(jobId: String): Job
        +getActiveJobs(queueName: String, page: int, pageSize: int): JobPaginatedResult
        +getCompletedJobs(queueName: String, startDate: LocalDateTime, endDate: LocalDateTime, page: int, pageSize: int): JobPaginatedResult
        +updateJobStatus(job: Job)
        +nextJob(): Job
        +updateJobProgress(jobId: String, progress: float)
    }

    class Job {
        +id: String
        +queueName: String
        +state: JobState
        +parameters: Map
        +progress: float
        +createdAt: LocalDateTime
        +updatedAt: LocalDateTime
        +startedAt: Optional~LocalDateTime~
        +completedAt: Optional~LocalDateTime~
        +result: Optional~JobResult~
    }

    class JobProcessor {
        <<interface>>
        +process(job: Job)
        +getResultMetadata(job: Job): Map
    }

    class Cancellable {
        <<interface>>
        +cancel(job: Job)
    }

    class RetryStrategy {
        <<interface>>
        +shouldRetry(job: Job, exceptionClass: Class): boolean
        +nextRetryDelay(job: Job): long
    }

    class CircuitBreaker {
        -failureThreshold: int
        -resetTimeout: long
        +allowRequest(): boolean
        +recordFailure()
        +reset()
    }

    class RealTimeJobMonitor {
        -jobWatchers: Map~String, List~Consumer~Job~~~
        +registerWatcher(jobId: String, watcher: Consumer~Job~)
        +updateWatchers(updatedJobs: List~Job~)
    }

    JobQueueManagerAPI <|.. JobQueueManagerAPIImpl
    JobQueueManagerAPIImpl --> JobQueue
    JobQueueManagerAPIImpl --> JobProcessor
    JobQueueManagerAPIImpl --> RetryStrategy
    JobQueueManagerAPIImpl --> CircuitBreaker
    JobQueueManagerAPIImpl --> RealTimeJobMonitor
    JobQueue <|.. PostgresJobQueue
    JobQueueManagerAPIImpl ..> Job
    JobProcessor <|-- Cancellable
Loading

This PR fixes: #29479

Introduced multiple job lifecycle event classes including `JobCreatedEvent`, `JobStartedEvent`, `JobProgressUpdatedEvent`, `JobCompletedEvent`, `JobFailedEvent`, and `JobCancelledEvent`. Updated `JobQueueManagerAPIImpl` to fire these events at appropriate stages in the job lifecycle to enhance observability and event-driven processing. Updated tests to accommodate the changes in event handling.
Replaced jobId strings with Job objects in event classes to simplify data access and reduce redundancy. Introduced RealTimeJobMonitor for handling real-time job updates, improving event notification efficiency and code maintainability. Updated tests and related classes to accommodate these changes.
Remove the `JobResult` enum and update job processing methods to use an `AbstractJobResult` interface. This refactor centralizes job result handling, differentiates success, failure, and cancellation cases, and incorporates metadata and error details within the job result.
…a-default-JobQueue-Implementation-using-Postgres
This commit adds a PostgreSQL-specific implementation of the JobQueue interface, providing detailed methods for job management using a PostgreSQL database. It also introduces new error handling classes like JobQueueException, JobQueueDataException, and renames ProcessorNotFoundException to JobProcessorNotFoundException for clarity.
Updated job queries to use common table expressions (CTEs) for pagination, ensuring all job retrieval methods now return a JobPaginatedResult object containing job data and pagination details. This improves the consistency and handling of job data across the application. Added new tests to verify the correctness of these changes.
Moved job result set mapping logic from `PostgresJobQueue` to new `DBJobTransformer` utility class. This change improves code modularity, readability, and maintainability by isolating the transformation logic in a dedicated class.
…a-default-JobQueue-Implementation-using-Postgres
…a-default-JobQueue-Implementation-using-Postgres
…a-default-JobQueue-Implementation-using-Postgres
…a-default-JobQueue-Implementation-using-Postgres
Implemented a public getter method for JobQueue in JobQueueManagerAPI. Updated corresponding tests to include assertions for the new method. This enhances testability and ensures the JobQueue dependency is correctly injected.
Decoupled job cancellation logic by introducing a `Cancellable` interface. This improves clarity and separation of concerns, ensuring only processors capable of cancellation implement the relevant method. Updated existing tests to support the new interface and ensured compatibility with the JobQueueManagerAPI.
…a-default-JobQueue-Implementation-using-Postgres
Decoupled job cancellation logic by introducing a `Cancellable` interface. This improves clarity and separation of concerns, ensuring only processors capable of cancellation implement the relevant method. Updated existing tests to support the new interface and ensured compatibility with the JobQueueManagerAPI.
…a-default-JobQueue-Implementation-using-Postgres
This commit introduces a new `JobCancellingEvent` and related logic to handle job cancellations. It also refines the job state management and progress tracking to ensure accurate updates and status changes.
Introduce markAsRunning() method to standardize setting job states to RUNNING. Replace usage of lastRetryTimestamp with completedAt for retry timing. Update tests to reflect changes and improve code readability. Ensure jobs are correctly re-queued with updated states.
…a-default-JobQueue-Implementation-using-Postgres
@jgambarios jgambarios linked an issue Sep 27, 2024 that may be closed by this pull request
@fabrizzio-dotCMS fabrizzio-dotCMS marked this pull request as ready for review September 30, 2024 15:23
@nollymar nollymar added this pull request to the merge queue Sep 30, 2024
Merged via the queue into master with commit b9dfc8a Sep 30, 2024
35 checks passed
@nollymar nollymar deleted the issue-29479-Create-a-default-JobQueue-Implementation-using-Postgres branch September 30, 2024 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a default JobQueue Implementation using Postgres
3 participants